第14章 智能融合:结合内部知识与外部搜索的RAG
学习目标
- 理解在RAG中融合内部知识库与外部Web搜索的必要性
- 掌握使用LangChain Agent动态决策检索源(向量库 vs. Web搜索)的方法
- 学习如何定义和使用自定义工具(Web搜索)并通过Function Calling调用本地服务
- 了解如何整合来自不同来源(向量库、Web搜索)的信息以增强生成效果
- 实现一个能够智能判断并融合内外知识的复杂RAG系统 (使用DeepSeek和本地GTE模型)
为何需要融合外部搜索?
上一节我们构建了基于内部文档的RAG系统,它在回答与已加载知识相关的问题时表现出色。然而,真实世界的问答场景往往更复杂:
- 知识时效性:内部知识库可能无法包含最新的事件、数据或发展动态。
- 知识覆盖面:用户的问题可能超出内部知识库的范围。
- 事实核查:对于某些内部信息,可能需要通过外部Web搜索进行交叉验证。
为了克服这些限制,我们需要将RAG系统的"视野"从内部扩展到广阔的互联网,实现内部知识(精确、专业)与外部信息(实时、广泛)的智能融合。
架构升级:引入决策智能与外部工具
为了实现这种融合,我们需要对基础RAG架构进行升级:
- 决策中心 (Agent):引入一个智能体(Agent),它能分析用户查询,并决定是应该查询内部向量数据库、执行外部Web搜索,还是两者结合。
- 外部工具 (Web Search Tool):定义一个Web搜索工具,使其能够被Agent调用,并连接到本地SearXNG服务。
- Function Calling / Tool Use:利用大语言模型(如DeepSeek)的工具使用能力,让Agent能够精确地调用我们定义的Web搜索工具。
- 结果整合:设计策略来合并来自向量数据库和Web搜索的结果,形成最终的增强上下文。
参考课程视频中的内容
使用LangChain Agent实现智能决策
LangChain Agent 提供了实现这种决策逻辑的强大框架。我们将使用支持工具使用的Agent(如 OpenAIFunctionsAgent
,假设DeepSeek兼容其格式或使用通用Agent),并赋予它访问内部知识库和本地SearXNG服务的能力。
1. 定义工具 (Tools)
我们需要定义至少两个工具:
- 内部知识库检索工具 (Vector DB Retriever Tool):用于查询我们之前构建的Chroma向量数据库(使用本地GTE模型)。
- Web搜索工具 (Web Search Tool):用于调用本地
localhost:1234
的SearXNG服务。
python
from langchain.agents import Tool
from langchain.tools import BaseTool
from langchain.pydantic_v1 import BaseModel, Field
from typing import Type
import requests # 用于调用外部API
import json
# 工具1:内部知识库检索工具
# (假设 vectorstore 和 retriever 在之前的代码中已基于本地GTE模型初始化)
# retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) # 确保 retriever 已定义
def search_vector_db(query: str):
"""当需要查询内部知识库(项目文档、专业流程、历史数据等)时使用此工具。"""
# 确保 retriever 在此作用域可用或作为类成员访问
if retriever:
docs = retriever.get_relevant_documents(query)
return "\\n---\\n".join([doc.page_content for doc in docs])
else:
return "内部知识库检索器未初始化。"
vector_db_tool = Tool(
name="InternalKnowledgeBase",
func=search_vector_db,
description="查询内部文档和知识库。适用于查询特定项目信息、内部流程、历史数据等。"
)
# 工具2:Web搜索工具 (调用本地 SearXNG)
# 定义输入模式
class WebSearchInput(BaseModel):
query: str = Field(description="需要进行网络搜索的查询语句")
# --- 函数:调用本地 SearXNG API ---
def call_searxng_api(query: str, searxng_url: str = "http://localhost:1234"):
"""
调用本地 SearXNG 实例 (运行在 http://localhost:1234) 进行 Web 搜索。
"""
print(f"--- [实时] 正在调用 SearXNG API ({searxng_url}),查询:{query} ---")
try:
params = {"q": query, "format": "json"}
# 使用你的本地SearXNG地址
response = requests.get(f"{searxng_url}/search", params=params, timeout=10)
response.raise_for_status() # 检查请求是否成功
data = response.json()
# 提取和格式化结果 (根据你的SearXNG输出调整)
results_summary = []
for result in data.get("results", [])[:3]: # 取前3条结果
title = result.get("title", "无标题")
content = result.get("content", result.get("snippet", "无摘要")) # 尝试获取snippet
url = result.get("url", "#")
results_summary.append(f"标题: {title}\\n摘要: {content}\\n链接: {url}")
if not results_summary:
return "未能从SearXNG获取到有效结果。"
return "\\n---\\n".join(results_summary)
except requests.exceptions.RequestException as e:
print(f"调用 SearXNG 出错: {e}")
return f"Web搜索失败: 无法连接到SearXNG服务 ({searxng_url}) 或请求失败 - {e}"
except json.JSONDecodeError:
print(f"解析 SearXNG 响应失败。内容: {response.text[:200]}...") # 打印部分响应内容
return "Web搜索失败:无法解析SearXNG响应。"
except Exception as e:
print(f"处理 SearXNG 结果时发生未知错误: {e}")
return f"Web搜索失败: {e}"
# --- 创建 Web Search Tool ---
class WebSearchTool(BaseTool):
name = "WebSearch"
description = "当需要获取实时信息、最新事件、广泛知识、或验证内部信息时,使用此工具通过本地SearXNG服务进行网络搜索。"
args_schema: Type[BaseModel] = WebSearchInput
def _run(self, query: str):
"""使用工具。"""
# 调用实际的API函数
return call_searxng_api(query)
async def _arun(self, query: str):
"""异步使用工具。"""
# 实际应用中需要使用异步HTTP库如 aiohttp
# 这里为了简单起见,仍然使用同步调用
# import asyncio
# loop = asyncio.get_event_loop()
# return await loop.run_in_executor(None, call_searxng_api, query)
return call_searxng_api(query) # 保持简单
web_search_tool = WebSearchTool()
# 将所有工具放入列表
tools = [vector_db_tool, web_search_tool]
- 注意:
call_searxng_api
函数现在会真正尝试连接http://localhost:1234
。确保你的 SearXNG 服务在该地址运行,并且网络配置允许从运行此代码的环境访问它。同时,错误处理也得到了加强。
2. 初始化Agent (使用DeepSeek)
我们将使用 ChatDeepSeek
作为LLM,并继续尝试使用 OpenAIFunctionsAgent
(假设DeepSeek的工具使用API与OpenAI兼容)。
python
from langchain.agents import OpenAIFunctionsAgent, AgentExecutor
from langchain.prompts import MessagesPlaceholder
from langchain.memory import ConversationBufferMemory
from langchain_community.chat_models import ChatDeepSeek # 导入DeepSeek
from langchain.schema import SystemMessage
# 初始化LLM - 使用DeepSeek
# !! 确保已设置 DEEPSEEK_API_KEY 环境变量 !!
try:
llm = ChatDeepSeek(model="deepseek-chat", temperature=0)
# 测试LLM是否可用 (可选)
# llm.invoke("你好")
except Exception as e:
print(f"初始化 DeepSeek LLM 失败: {e}")
print("请确保 langchain-community 已安装并且 DEEPSEEK_API_KEY 环境变量已设置。")
llm = None # 标记LLM不可用
if llm:
# 设置系统消息,指导Agent如何决策
system_message = SystemMessage(content=(
"你是一个智能问答助手。"
"请仔细分析用户的问题,判断所需信息是更可能存在于内部知识库还是需要通过本地SearXNG服务进行Web搜索。"
"内部知识库包含项目文档、历史记录和专业流程信息。"
"Web搜索用于获取最新事件、实时数据、广泛背景知识或验证信息。"
"如果问题模糊,优先考虑内部知识库,如果信息不足或需要最新信息,则使用Web搜索。"
"有时可能需要结合两者的信息来回答。"
"总是优先使用你拥有的工具来寻找答案。"
))
# 设置对话记忆
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
# 创建Agent Prompt
# OpenAIFunctionsAgent 可能需要针对非OpenAI模型调整,但先尝试
try:
prompt = OpenAIFunctionsAgent.create_prompt(
system_message=system_message,
extra_prompt_messages=[MessagesPlaceholder(variable_name="chat_history")]
)
except Exception as e:
print(f"创建 Agent Prompt 失败 (可能与DeepSeek兼容性有关): {e}")
prompt = None # 标记Prompt创建失败
if prompt:
# 创建Agent
try:
agent = OpenAIFunctionsAgent(llm=llm, tools=tools, prompt=prompt)
except Exception as e:
print(f"创建 OpenAIFunctionsAgent 失败 (可能与DeepSeek兼容性有关): {e}")
# 可以考虑回退到更通用的Agent类型,如 ReAct
# from langchain.agents import AgentType, initialize_agent
# agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True, memory=memory, handle_parsing_errors=True)
agent = None # 标记Agent创建失败
if agent:
# 创建Agent执行器
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
memory=memory,
verbose=True, # 打印详细执行过程
handle_parsing_errors="将错误信息反馈给用户。" # 更友好的错误处理
)
print("DeepSeek Agent 执行器已创建。")
else:
agent_executor = None
print("Agent 创建失败,无法创建执行器。")
else:
agent_executor = None
print("Agent Prompt 创建失败,无法创建执行器。")
else:
agent_executor = None
print("LLM 初始化失败,无法创建 Agent 执行器。")
- 关键变更:使用了
ChatDeepSeek
,并添加了一些错误处理来检查LLM和Agent Prompt/Agent是否能成功初始化,因为非OpenAI模型与特定Agent类型的兼容性可能需要验证。
3. 执行Agent进行问答
执行方式不变,但现在Agent会调用真实的本地SearXNG服务。
python
# 假设 agent_executor 初始化成功
if agent_executor:
# 查询1:更可能在内部知识库
response1 = agent_executor.run("我们项目上一季度的关键性能指标是什么?")
print("Response 1:", response1)
# 查询2:更可能需要Web搜索
response2 = agent_executor.run("Explain the concept of Retrieval-Augmented Generation (RAG).") # 英文查询示例
print("Response 2:", response2)
# 查询3:可能需要结合两者
response3 = agent_executor.run("对比我们内部使用的Transformer模型和最新的市场上的MoE模型的主要优缺点。")
print("Response 3:", response3)
else:
print("Agent 未成功初始化,无法执行查询。")
结果整合策略
当Agent调用了多个工具(例如,先查内部库发现不足,再查Web),或者你希望主动结合两者信息时,需要考虑如何整合:
- 简单拼接:将向量数据库检索到的片段和Web搜索结果直接拼接到一起,作为LLM的上下文。这是Agent通常的默认行为。
- LLM Reranking/Summarization:在获取两方面信息后,再用一个LLM调用来对所有信息进行排序、筛选、去重或总结,生成更精炼的上下文。
- 加权融合:根据信息来源的可靠性或相关性,在Prompt中给予不同的强调或指示。
Agent框架通常会处理第一种简单拼接。对于更复杂的策略,你可能需要自定义Agent的逻辑或在Agent执行后进行额外的处理步骤。
完整代码示例 (使用DeepSeek和本地GTE)
下面的 SmartDocumentAssistant
类整合了上述变更。
python
import os
from langchain.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings # 用于本地模型
from langchain_community.chat_models import ChatDeepSeek # 使用DeepSeek
from langchain_chroma import Chroma
from langchain.memory import ConversationBufferMemory
from langchain.prompts import MessagesPlaceholder
from langchain.schema import SystemMessage
from langchain.agents import Tool, OpenAIFunctionsAgent, AgentExecutor
from langchain.tools import BaseTool
from langchain.pydantic_v1 import BaseModel, Field
from typing import Type
import json
import requests
# --- 配置 ---
# !! 确保已设置 DEEPSEEK_API_KEY 环境变量 !!
# os.environ["DEEPSEEK_API_KEY"] = "your-deepseek-api-key"
DEFAULT_MODEL_DIR = "./gte-large" # <--- 使用 GTE 模型路径
DEFAULT_DOCS_DIR = "./documents"
DEFAULT_DB_DIR = "chroma_db_gte_deepseek" # <--- 更新DB目录名以反映配置
# 全局变量以供Tool函数访问 (或者更好的方式是封装在类中)
retriever = None
# --- Web Search Tool (同上) ---
class WebSearchInput(BaseModel):
query: str = Field(description="需要进行网络搜索的查询语句")
def call_searxng_api(query: str, searxng_url: str = "http://localhost:1234"):
"""调用本地 SearXNG 实例进行 Web 搜索。"""
print(f"--- [实时] 正在调用 SearXNG API ({searxng_url}),查询:{query} ---")
try:
params = {"q": query, "format": "json"}
response = requests.get(f"{searxng_url}/search", params=params, timeout=10)
response.raise_for_status()
data = response.json()
results_summary = []
for result in data.get("results", [])[:3]:
title = result.get("title", "无标题")
content = result.get("content", result.get("snippet", "无摘要"))
url = result.get("url", "#")
results_summary.append(f"标题: {title}\\n摘要: {content}\\n链接: {url}")
if not results_summary: return "未能从SearXNG获取到有效结果。"
return "\\n---\\n".join(results_summary)
except requests.exceptions.RequestException as e:
print(f"调用 SearXNG 出错: {e}")
return f"Web搜索失败: 无法连接到SearXNG服务 ({searxng_url}) 或请求失败 - {e}"
except json.JSONDecodeError:
print(f"解析 SearXNG 响应失败。内容: {response.text[:200]}...")
return "Web搜索失败:无法解析SearXNG响应。"
except Exception as e:
print(f"处理 SearXNG 结果时发生未知错误: {e}")
return f"Web搜索失败: {e}"
class WebSearchTool(BaseTool):
name = "WebSearch"
description = "当需要获取实时信息、最新事件、广泛知识、或验证内部信息时,使用此工具通过本地SearXNG服务 (localhost:1234) 进行网络搜索。"
args_schema: Type[BaseModel] = WebSearchInput
def _run(self, query: str): return call_searxng_api(query)
async def _arun(self, query: str): return call_searxng_api(query) # 保持简单
# --- 内部知识库搜索函数 (需要 retriever) ---
def search_vector_db_global(query: str):
"""当需要查询内部知识库(项目文档、专业流程、历史数据等)时使用此工具。"""
global retriever # 访问全局 retriever
if retriever:
try:
docs = retriever.get_relevant_documents(query)
if docs:
return "\\n---\\n".join([doc.page_content for doc in docs])
else:
return "在内部知识库中未找到相关文档。"
except Exception as e:
print(f"检索内部知识库时出错: {e}")
return "检索内部知识库失败。"
else:
return "内部知识库检索器未初始化。"
# --- 智能文档助手类 ---
class SmartDocumentAssistant:
def __init__(self, docs_dir=DEFAULT_DOCS_DIR, model_dir=DEFAULT_MODEL_DIR, db_dir=DEFAULT_DB_DIR):
self.docs_dir = docs_dir
self.model_dir = model_dir # 使用 GTE
self.db_dir = db_dir
self.vectorstore = None
self.agent_executor = None
self.embeddings = None
self.llm = None
self._setup()
def _init_embeddings(self):
try:
self.embeddings = HuggingFaceEmbeddings(
model_name=self.model_dir,
model_kwargs={'device': 'cpu'}, # 根据你的硬件调整 'cuda' 或 'mps'
encode_kwargs={'normalize_embeddings': True}
)
print(f"成功初始化本地嵌入模型: {self.model_dir}")
except Exception as e:
print(f"初始化本地嵌入模型失败 ({self.model_dir}): {e}")
print("请确保模型文件存在且依赖库 (sentence-transformers, torch等) 已安装。")
self.embeddings = None
def _init_llm(self):
try:
self.llm = ChatDeepSeek(model="deepseek-chat", temperature=0)
self.llm.invoke("测试连接") # 简单测试
print("成功初始化 DeepSeek LLM。")
except Exception as e:
print(f"初始化 DeepSeek LLM 失败: {e}")
print("请确保 langchain-community 已安装并且 DEEPSEEK_API_KEY 环境变量已设置。")
self.llm = None
def _load_documents(self):
"""加载文档 (示例,请根据实际情况填充)"""
documents = []
if not os.path.exists(self.docs_dir):
print(f"警告: 文档目录 {self.docs_dir} 不存在。")
return documents
for file in os.listdir(self.docs_dir):
file_path = os.path.join(self.docs_dir, file)
try:
if file.endswith('.pdf'):
loader = PyPDFLoader(file_path)
documents.extend(loader.load())
elif file.endswith('.docx'):
loader = Docx2txtLoader(file_path)
documents.extend(loader.load())
elif file.endswith('.txt'):
loader = TextLoader(file_path, encoding='utf-8') # 指定编码
documents.extend(loader.load())
except Exception as e:
print(f"加载文件 {file_path} 失败: {e}")
print(f"已加载 {len(documents)} 个文档片段")
return documents
def _process_documents(self):
"""处理文档并创建/保存向量库"""
if not self.embeddings:
print("嵌入模型未初始化,无法处理文档。")
return False
documents = self._load_documents()
if not documents:
print("未加载任何文档,无法创建知识库。")
return False
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, chunk_overlap=200,
separators=["\\n\\n", "\\n", "。", "!", "?", ".", " ", ""]
)
chunks = text_splitter.split_documents(documents)
if not chunks:
print("文档分割后没有得到任何块。")
return False
print(f"文档已分割为 {len(chunks)} 个块")
try:
self.vectorstore = Chroma.from_documents(
documents=chunks,
embedding_function=self.embeddings,
persist_directory=self.db_dir
)
self.vectorstore.persist()
print(f"向量存储已创建并保存至 {self.db_dir}")
return True
except Exception as e:
print(f"创建 Chroma 向量库失败: {e}")
self.vectorstore = None
return False
def _load_vectorstore(self):
"""加载现有向量库"""
if not self.embeddings:
print("嵌入模型未初始化,无法加载向量库。")
return False
if os.path.exists(self.db_dir):
try:
self.vectorstore = Chroma(
persist_directory=self.db_dir,
embedding_function=self.embeddings
)
global retriever # 设置全局 retriever
retriever = self.vectorstore.as_retriever(search_kwargs={"k": 3})
print(f"已从 {self.db_dir} 加载向量存储并设置检索器。")
return True
except Exception as e:
print(f"加载 Chroma 向量库失败 ({self.db_dir}): {e}")
self.vectorstore = None
return False
print(f"未找到向量存储目录: {self.db_dir}")
return False
def _setup_agent(self):
"""设置Agent"""
if not self.vectorstore or not self.llm:
print("向量库或LLM未初始化,无法设置Agent。")
return
# 1. 创建内部知识库工具 (使用全局函数)
vector_db_tool = Tool(
name="InternalKnowledgeBase",
func=search_vector_db_global, # 使用能访问全局retriever的函数
description="查询内部文档和知识库。适用于查询特定项目信息、内部流程、历史数据等。"
)
# 2. 创建Web搜索工具
web_search_tool = WebSearchTool()
# 3. 定义Agent所需组件
tools = [vector_db_tool, web_search_tool]
system_message = SystemMessage(content=(
"你是一个智能问答助手。"
"分析用户问题,判断信息来源:内部知识库(项目文档、历史数据)还是Web搜索(最新信息、广泛知识)。"
"优先使用工具获取信息来回答。"
"如果内部信息不足或需要最新消息,使用Web搜索。"
"综合利用信息给出回答。"
))
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
try:
prompt = OpenAIFunctionsAgent.create_prompt(
system_message=system_message,
extra_prompt_messages=[MessagesPlaceholder(variable_name="chat_history")]
)
agent = OpenAIFunctionsAgent(llm=self.llm, tools=tools, prompt=prompt)
self.agent_executor = AgentExecutor(
agent=agent,
tools=tools,
memory=memory,
verbose=True,
handle_parsing_errors="将错误信息反馈给用户。"
)
print("DeepSeek Agent 执行器已创建。")
except Exception as e:
print(f"创建 Agent (或 Executor) 失败: {e}")
print("可能需要尝试不同的 Agent 类型或检查 DeepSeek 的工具使用兼容性。")
self.agent_executor = None
def _setup(self):
"""初始化助手:加载或创建向量库,并设置Agent"""
print("开始初始化智能助手...")
self._init_embeddings()
self._init_llm()
if not self.embeddings or not self.llm:
print("基础组件(Embeddings或LLM)初始化失败,无法继续。")
return
if not self._load_vectorstore():
print("未加载现有向量库,尝试创建新的...")
if self._process_documents():
self._load_vectorstore() # 创建后重新加载
else:
print("创建新向量库失败。")
return # 无法继续
self._setup_agent()
print("智能助手初始化完成。")
def ask(self, question):
"""使用Agent回答问题"""
if not self.agent_executor:
return "智能Agent未初始化或初始化失败。"
try:
# 清理问题中的潜在危险字符可能不是必须的,但作为示例
# import re
# safe_question = re.sub(r'[^\w\s,.?!\u4e00-\u9fff]+', '', question)
response = self.agent_executor.run(question)
return response
except Exception as e:
print(f"Agent 执行时出错: {e}")
# 可以考虑打印更详细的traceback
# import traceback
# traceback.print_exc()
return f"处理您的请求时遇到错误。请稍后再试或尝试不同的问法。错误: {e}"
# --- 使用示例 ---
if __name__ == "__main__":
# 确保文档目录存在
if not os.path.exists(DEFAULT_DOCS_DIR):
os.makedirs(DEFAULT_DOCS_DIR)
print(f"创建了文档目录: {DEFAULT_DOCS_DIR}, 请放入文档文件 (pdf, docx, txt)")
# 确保模型目录存在 (这里不创建,假设用户已下载)
if not os.path.exists(DEFAULT_MODEL_DIR):
print(f"警告: 嵌入模型目录 {DEFAULT_MODEL_DIR} 不存在。请确保已下载 GTE 模型到该路径。")
print("正在初始化助手,请稍候...")
assistant = SmartDocumentAssistant()
if assistant.agent_executor:
print("\n智能文档助手已就绪。输入'退出'结束。")
while True:
try:
question = input("\n请输入问题: ")
if question.lower() in ['退出', 'exit', 'quit']:
break
if not question:
continue
response = assistant.ask(question)
print("\n助手回答:", response)
except EOFError: # 处理管道输入结束等情况
break
except KeyboardInterrupt:
print("\n用户中断。")
break
else:
print("\n助手未能成功初始化,无法启动交互式问答。请检查上面的错误信息。")
print(f"请确认: ")
print(f"1. DEEPSEEK_API_KEY 环境变量已正确设置。")
print(f"2. SearXNG 服务正在 http://localhost:1234 运行。")
print(f"3. GTE 嵌入模型已下载到 {DEFAULT_MODEL_DIR} 目录。")
print(f"4. 相关 Python 库已安装 (langchain, langchain-community, langchain-chroma, sentence-transformers, requests, pydantic, etc.)。")
思考题
- Agent在决策使用哪个工具(内部库 vs Web搜索)时,可能依赖哪些因素?如何优化这个决策过程?
- 融合Web搜索结果可能引入噪声或不准确信息,如何设计机制来过滤或验证Web搜索的内容?
- 调用外部Web搜索API会增加延迟和潜在成本,如何在保证信息时效性的同时,控制这些开销?
- 当内部知识库的信息与Web搜索结果冲突时,系统应该如何处理?优先哪个来源?
- 除了简单的工具选择,Agent是否可以实现更复杂的融合策略,例如,先进行Web搜索获取背景,再用背景信息优化内部知识库的查询?
通过引入Agent和外部工具(如Web搜索),RAG系统变得更加强大和灵活,能够更好地应对现实世界中复杂多变的问答需求。接下来的章节我们将探讨如何部署和评估这样的高级RAG系统。